#!/usr/bin/env python3
"""Create or update a webhook integration screenshot using a test fixture."""

import argparse
import base64
import os
import re
import subprocess
import sys
from typing import Any
from urllib.parse import parse_qsl, urlencode

import zulip

SCREENSHOTS_DIR = os.path.abspath(os.path.dirname(__file__))
TOOLS_DIR = os.path.abspath(os.path.dirname(SCREENSHOTS_DIR))
ROOT_DIR = os.path.dirname(TOOLS_DIR)
sys.path.insert(0, ROOT_DIR)

# check for the venv
from tools.lib import sanity_check

sanity_check.check_venv(__file__)

from scripts.lib.setup_path import setup_path

setup_path()

os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings"
import django

django.setup()

import orjson
import requests

from scripts.lib.zulip_tools import BOLDRED, ENDC
from tools.lib.test_script import prepare_puppeteer_run
from zerver.actions.create_user import do_create_user, notify_created_bot
from zerver.actions.streams import bulk_add_subscriptions
from zerver.actions.user_settings import do_change_avatar_fields
from zerver.lib.integrations import (
    FIXTURELESS_INTEGRATIONS_WITH_SCREENSHOTS,
    INCOMING_WEBHOOK_INTEGRATIONS,
    INTEGRATIONS,
    FixturelessScreenshotConfig,
    IncomingWebhookIntegration,
    Integration,
    WebhookScreenshotConfig,
    get_fixture_path,
    get_image_path,
    split_fixture_path,
)
from zerver.lib.storage import static_path
from zerver.lib.streams import create_stream_if_needed
from zerver.lib.upload import upload_avatar_image
from zerver.lib.webhooks.common import get_fixture_http_headers, parse_multipart_string
from zerver.models import Message, UserProfile
from zerver.models.realms import get_realm
from zerver.models.users import get_user_by_delivery_email


def create_integration_bot(integration: Integration, bot_name: str | None = None) -> UserProfile:
    realm = get_realm("zulip")
    owner = get_user_by_delivery_email("iago@zulip.com", realm)
    email_prefix = (
        re.sub(r"[^a-zA-Z0-9_-]", "", bot_name.strip()).lower()[:64]
        if bot_name
        else integration.name
    )
    bot_email = f"{email_prefix}-bot@example.com"
    if bot_name is None:
        bot_name = f"{integration.display_name} Bot"
    try:
        bot = UserProfile.objects.get(email=bot_email)
    except UserProfile.DoesNotExist:
        bot = do_create_user(
            email=bot_email,
            password="123",
            realm=owner.realm,
            full_name=bot_name,
            bot_type=UserProfile.INCOMING_WEBHOOK_BOT,
            bot_owner=owner,
            acting_user=owner,
        )
        notify_created_bot(bot)

        bot_avatar_path = integration.get_bot_avatar_path()
        bot_avatar_path = static_path(bot_avatar_path)
        if os.path.isfile(bot_avatar_path):
            with open(bot_avatar_path, "rb") as f:
                upload_avatar_image(f, bot)
                do_change_avatar_fields(bot, UserProfile.AVATAR_FROM_USER, acting_user=owner)
        else:
            raise ValueError(f"Bot avatar file {bot_avatar_path} does not exist.")

    return bot


def create_integration_channel(channel_name: str, bot: UserProfile) -> None:
    assert isinstance(bot.bot_owner, UserProfile)
    realm = bot.bot_owner.realm
    channel, _created = create_stream_if_needed(realm, channel_name)
    bulk_add_subscriptions(realm, [channel], [bot, bot.bot_owner], acting_user=bot)


def get_fixture_info(fixture_path: str) -> tuple[Any, bool, bool, str]:
    json_fixture = fixture_path.endswith(".json")
    multipart_fixture = fixture_path.endswith(".multipart")
    _, fixture_name = split_fixture_path(fixture_path)

    if fixture_name:
        if not os.path.exists(fixture_path):
            raise FileNotFoundError(
                f"{BOLDRED}The fixture '{fixture_path}' does not exist. Please check the fixture file name and try again.{ENDC}"
            )
        if json_fixture:
            with open(fixture_path, "rb") as fb:
                data = orjson.loads(fb.read())
        else:
            with open(fixture_path) as f:
                data = f.read().strip()
            if multipart_fixture:
                data = parse_multipart_string(data)
    else:
        data = ""

    return data, json_fixture, multipart_fixture, fixture_name


def get_requests_headers(integration_dir_name: str, fixture_name: str) -> dict[str, Any]:
    headers = get_fixture_http_headers(integration_dir_name, fixture_name)

    def fix_name(header: str) -> str:
        return header.removeprefix("HTTP_").replace("_", "-")

    return {fix_name(k): v for k, v in headers.items()}


def custom_headers(headers_json: str) -> dict[str, str]:
    if not headers_json:
        return {}
    try:
        return orjson.loads(headers_json)
    except orjson.JSONDecodeError as ve:
        raise argparse.ArgumentTypeError(
            f"Encountered an error while attempting to parse custom headers: {ve}\n"
            "Note: all strings must be enclosed within \"\" instead of ''"
        )


def send_bot_payload_message(
    bot: UserProfile,
    integration: IncomingWebhookIntegration,
    fixture_path: str,
    config: WebhookScreenshotConfig,
) -> bool:
    # Delete all messages, so new message is the only one it's message group
    Message.objects.filter(realm_id=bot.realm_id, sender=bot).delete()
    data, json_fixture, multipart_fixture, fixture_name = get_fixture_info(fixture_path)

    headers = get_requests_headers(integration.dir_name, fixture_name)
    if config.custom_headers:
        headers.update(config.custom_headers)
    if config.use_basic_auth:
        credentials = base64.b64encode(f"{bot.email}:{bot.api_key}".encode()).decode()
        auth = f"basic {credentials}"
        headers.update(dict(Authorization=auth))

    assert isinstance(bot.bot_owner, UserProfile)
    channel_name = config.channel or integration.name
    url = f"{bot.bot_owner.realm.url}/{integration.url}"
    params = {"api_key": bot.api_key, "stream": channel_name}
    params.update(config.extra_params)

    extra_args = {}
    if multipart_fixture:
        extra_args = {"data": data}

    elif not json_fixture and data:
        assert isinstance(data, str)

        # fixtures with url parameters
        if "&" in data:
            # Overwrite the fixture params, in case of overlap.
            parsed_params = dict(parse_qsl(data))
            parsed_params.update(params)
            params = parsed_params

        # fixtures with plain/text payload
        else:
            extra_args = {"data": data}

    elif config.payload_as_query_param:
        params[config.payload_param_name] = orjson.dumps(data).decode()

    else:
        extra_args = {"json": data}

    url = f"{url}?{urlencode(params)}"

    try:
        response = requests.post(url=url, headers=headers, **extra_args)
    except requests.exceptions.ConnectionError:
        print(
            "This tool needs the local dev server to be running. "
            "Please start it using tools/run-dev before running this tool."
        )
        sys.exit(1)
    if response.status_code != 200:
        print(response.json())
        print("Failed to trigger webhook")
        return False
    else:
        print(f"Triggered {integration.name} webhook")
        return True


def send_bot_mock_message(bot: UserProfile, channel: str, topic: str, message: str) -> None:
    # Delete all messages, so that the new message is the only one in its message group
    Message.objects.filter(realm_id=bot.realm_id, sender=bot).delete()
    assert bot.bot_owner is not None
    url = f"{bot.bot_owner.realm.url}"
    client = zulip.Client(email=bot.email, api_key=bot.api_key, site=url)

    request = {"type": "stream", "to": channel, "topic": topic, "content": message}
    client.send_message(request)


def capture_last_message_screenshot(bot: UserProfile, image_path: str) -> None:
    message = Message.objects.filter(realm_id=bot.realm_id, sender=bot).last()
    realm = get_realm("zulip")
    if message is None:
        print(f"No message found for {bot.full_name}")
        return
    message_id = str(message.id)
    screenshot_script = os.path.join(SCREENSHOTS_DIR, "message-screenshot.ts")
    subprocess.check_call(["node", screenshot_script, message_id, image_path, realm.url])


def generate_screenshots_for_integration(integration: Integration) -> None:
    if integration.screenshot_configs is not None:
        for screenshot_config in integration.screenshot_configs:
            generate_screenshot_from_config(integration.name, screenshot_config)


def generate_screenshot_from_config(
    integration_name: str, screenshot_config: WebhookScreenshotConfig | FixturelessScreenshotConfig
) -> None:
    integration = INTEGRATIONS[integration_name]
    bot = create_integration_bot(integration, screenshot_config.bot_name)
    channel_name = screenshot_config.channel or integration.name
    create_integration_channel(channel_name, bot)
    image_path = get_image_path(integration, screenshot_config)

    if isinstance(integration, IncomingWebhookIntegration):
        assert isinstance(screenshot_config, WebhookScreenshotConfig)
        fixture_path = get_fixture_path(integration, screenshot_config)
        message_sent = send_bot_payload_message(bot, integration, fixture_path, screenshot_config)
    else:
        assert isinstance(screenshot_config, FixturelessScreenshotConfig)
        send_bot_mock_message(
            bot,
            channel=channel_name,
            topic=screenshot_config.topic,
            message=screenshot_config.message,
        )
        message_sent = True

    if message_sent:
        capture_last_message_screenshot(bot, image_path)
        print(f"Screenshot captured to: {BOLDRED}{image_path}{ENDC}")


parser = argparse.ArgumentParser()

batch_group = parser.add_mutually_exclusive_group(required=True)
batch_group.add_argument("--all", action="store_true")
batch_group.add_argument("--all-webhook", action="store_true")
batch_group.add_argument("--all-fixtureless", action="store_true")
batch_group.add_argument(
    "--skip-until",
    help="Name of the integration to start processing from, including all that follow in alphabetical order. Similar to --all",
)
batch_group.add_argument("--integration", nargs="+", help="Names of specific integrations")

common_group = parser.add_argument_group()
common_group.add_argument("--image-name", help="Name for the screenshot image")
common_group.add_argument("--image-dir", help="Directory name where to save the screenshot image")
common_group.add_argument("--bot-name", help="Name to use for the bot")

webhook_group = parser.add_argument_group("Webhook integrations options")
webhook_group.add_argument(
    "--fixture-name",
    help="Name of the fixture file to use. All other webhook options are ignored if the fixture is not specified.",
)
webhook_group.add_argument(
    "-A", "--use-basic-auth", action="store_true", help="Add basic auth headers to the request"
)
webhook_group.add_argument(
    "-Q",
    "--payload-as-query-param",
    action="store_true",
    help="Send payload as query param instead of body",
)
webhook_group.add_argument("-P", "--payload-param-name", help="Param name to use for the payload")
webhook_group.add_argument(
    "-H",
    "--custom-headers",
    type=custom_headers,
    help="Any additional headers to be sent with the request.",
)

fixtureless_group = parser.add_argument_group("Fixtureless non-webhook integrations options")
fixtureless_group.add_argument("-T", "--topic", help="Topic to use for the mock message")
fixtureless_group.add_argument("-M", "--message", help="Message to use for the mock message")
fixtureless_group.add_argument(
    "-C",
    "--channel",
    help="Channel name to use for the mock message. Ignored if --topic and --message are not specified.",
)

options = parser.parse_args()
prepare_puppeteer_run()


def validate_integration_count(
    options: argparse.Namespace, parser: argparse.ArgumentParser, option_name: str
) -> None:
    if len(options.integration) != 1:
        parser.error(
            f"Exactly one integration should be specified with --integration when --{option_name} is provided"
        )


def build_config(options: argparse.Namespace, config_keys: list[str]) -> dict[str, Any]:
    config = {
        key: getattr(options, key) for key in config_keys if getattr(options, key) is not None
    }
    return config


def generate_full_batch_screenshots(
    option_key: str,
    integration_list: list[Integration] | list[IncomingWebhookIntegration],
    batch_type: str,
) -> None:
    for key, value in vars(options).items():
        if key != option_key and value:
            print(f"Generating screenshots for {batch_type}. Ignoring all command-line options")
            break
    for integration in integration_list:
        generate_screenshots_for_integration(integration)


if options.all:
    generate_full_batch_screenshots("all", list(INTEGRATIONS.values()), "all integrations")
elif options.all_webhook:
    generate_full_batch_screenshots(
        "all_webhook", INCOMING_WEBHOOK_INTEGRATIONS, "all incoming webhook integrations"
    )
elif options.all_fixtureless:
    fixtureless_integrations_list = [
        INTEGRATIONS[name] for name in FIXTURELESS_INTEGRATIONS_WITH_SCREENSHOTS
    ]
    generate_full_batch_screenshots(
        "all_fixtureless", fixtureless_integrations_list, "all fixtureless integrations"
    )
elif options.skip_until:
    for key, value in vars(options).items():
        if key != "skip_until" and value:
            print(
                f"Generating screenshots for all integrations starting from {options.skip_until}. Ignoring all command-line options"
            )
            break
    skip = True
    for integration_name in sorted(INTEGRATIONS):
        if integration_name == options.skip_until:
            skip = False
        if skip:
            continue
        generate_screenshots_for_integration(INTEGRATIONS[integration_name])

elif options.fixture_name:
    validate_integration_count(options, parser, "fixture-name")
    webhook_options = [action.dest for action in webhook_group._group_actions]
    common_options = [action.dest for action in common_group._group_actions]
    config = build_config(options, webhook_options + common_options)
    webhook_screenshot_config = WebhookScreenshotConfig(**config)
    generate_screenshot_from_config(options.integration[0], webhook_screenshot_config)

elif options.topic or options.message:
    if not options.topic or not options.message:
        parser.error("Both --topic and --message must be specified together")
    validate_integration_count(options, parser, "topic")
    fixtureless_options = [action.dest for action in fixtureless_group._group_actions]
    common_options = [action.dest for action in common_group._group_actions]
    config = build_config(options, fixtureless_options + common_options)
    fixtureless_screenshot_config = FixturelessScreenshotConfig(**config)
    generate_screenshot_from_config(options.integration[0], fixtureless_screenshot_config)

elif options.integration:
    for integration in options.integration:
        assert integration in INTEGRATIONS
        configs = INTEGRATIONS[integration].screenshot_configs
        if configs is None:
            parser.error(f"Could not find screenshot configuration for integration {integration}. ")
        for screenshot_config in configs:
            generate_screenshot_from_config(integration, screenshot_config)

else:
    parser.error(
        "Could not find configuration for integration. "
        "You can specify a fixture file to use, using the --fixture flag. "
        "Or add a configuration to the corresponding integration in zerver.lib.integrations.INTEGRATIONS.",
    )
